package cn.zhaopin.starter.mq.support;

import cn.zhaopin.starter.mq.common.PulsarMessage;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;

/**
 * Description: 自定义 json reader
 *
 * @author: zuomin (myleszelic@outlook.com)
 * @date: 2021/07/22-10:10
 */
public class Jackson2JsonReader implements SchemaReader<PulsarMessage> {

    private final ObjectMapper objectMapper;
    private static final Logger log = LoggerFactory.getLogger(Jackson2JsonReader.class);

    Jackson2JsonReader(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
    }

    @Override
    public PulsarMessage read(byte[] bytes, int offset, int length) {
        try {
            return objectMapper.readValue(bytes, offset, length, PulsarMessage.class);
        } catch (IOException exp) {
            throw new SchemaSerializationException(exp);
        }
    }

    @Override
    public PulsarMessage read(InputStream inputStream) {
        try {
            return this.objectMapper.readValue(inputStream, PulsarMessage.class);
        } catch (IOException ioException) {
            throw new SchemaSerializationException(ioException);
        } finally {
            try {
                inputStream.close();
            } catch (IOException ioException) {
                log.error("Json2Reader close inputStream close error", ioException);
            }

        }
    }
}
